Skip to content

feat: Subscription Lifecycle Amendment#356

Open
samwillis wants to merge 32 commits intoTanStack:mainfrom
thruflo:samwillis/sub-lifecycle
Open

feat: Subscription Lifecycle Amendment#356
samwillis wants to merge 32 commits intoTanStack:mainfrom
thruflo:samwillis/sub-lifecycle

Conversation

@samwillis
Copy link

@samwillis samwillis commented Mar 9, 2026

TL;DR

This PR extends the current subscribe() + send() connection mode PR by introducing a first-class subscription lifecycle in @tanstack/ai-client, separate from the existing request lifecycle. It also fixes two live-session correctness bugs in StreamProcessor that caused transient UI corruption under concurrent runs and reconnect-mid-generation.

It adds explicit APIs and state for connection behavior (subscribe, unsubscribe, isSubscribed, connectionStatus, sessionGenerating) while preserving backward compatibility for existing connect()-only adapters and existing request semantics. The stream processor is now run-aware — RUN_FINISHED only finalizes when all active runs complete — and reconnect-safe — resumed content for an existing message hydrates state rather than creating a duplicate.

Note: this PR is stacked on #286 but due to that originating outside the main repo this cannot be marked as targeting it. This PR is only the final 4 commits. The real diff can be seen here: https://github.com/thruflo/tanstack-ai/pull/3/changes


Why this amendment is needed

The current durable session direction adds long-lived streaming behavior, but request state (isLoading, status) and connection state are still easy to conflate.

That creates ambiguity in real apps:

  • In the current baseline, there is no explicit ChatClient.subscribe() API, and
    no automatic mount-time subscription behavior in hooks.
  • Subscription startup is effectively implicit: it begins on the first message
    request (sendMessage / append / reload), not as an explicit session start.
  • A request may complete while the live subscription should remain open.
  • A user may want to stop only the local request (stop) without tearing down the live connection.
  • UI frameworks need a stable way to expose and react to connection state independently of request state.x§

This amendment addresses that by making connection lifecycle explicit, observable, and controllable.


Proposed design overview

1) Two distinct lifecycles

Request lifecycle (existing, unchanged in spirit)

  • Driven by individual calls like sendMessage, append, reload
  • Surfaces:
    • isLoading
    • status: 'ready' | 'submitted' | 'streaming' | 'error'
  • chatClient.stop() cancels local in-flight request work

Connection lifecycle (new)

  • Driven by long-lived subscription state
  • Surfaces:
    • isSubscribed: boolean
    • connectionStatus: 'disconnected' | 'connecting' | 'connected' | 'error'
  • Managed via:
    • chatClient.subscribe()
    • chatClient.unsubscribe()

Session generation lifecycle (new)

  • Driven by stream run events (RUN_STARTED, RUN_FINISHED, RUN_ERROR)
  • Surfaces:
    • sessionGenerating: boolean
  • Unlike isLoading (request-local, only visible to the sender), sessionGenerating reflects shared generation activity visible to all subscribers — enabling typing indicators for non-sender tabs/devices/users.
  • Correctly handles concurrent runs: stays true until the last active run completes (tracked via Set<runId>).
  • Resets on unsubscribe(), connection replacement, and fatal subscription errors.

2) Public API additions (@tanstack/ai-client)

New public methods on ChatClient instance:

  • subscribe(options?: { restart?: boolean }): void
  • unsubscribe(): void
  • getIsSubscribed(): boolean
  • getConnectionStatus(): ConnectionStatus
  • getSessionGenerating(): boolean

New callback hooks on ChatClient instance:

  • onSubscriptionChange?: (isSubscribed: boolean) => void
  • onConnectionStatusChange?: (status: ConnectionStatus) => void
  • onSessionGeneratingChange?: (isGenerating: boolean) => void

New exported type:

  • ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'error'

3) Updated behavioral semantics

stop() semantics

stop() now affects only request lifecycle (local run cancellation).
It does not implicitly unsubscribe the connection loop.

Use unsubscribe() for full live teardown.

updateOptions({ connection }) semantics

On connection replacement:

  1. Abort in-flight request work
  2. Tear down previous subscription loop
  3. Reset subscription state to disconnected
  4. Re-subscribe only if the client was previously subscribed

This preserves "live intent" across adapter swaps without auto-subscribing clients that were previously disconnected.


4) Adapter model and backward compatibility

ConnectionAdapter supports two modes:

  • Legacy request-scoped: connect(...)
  • Durable mode: subscribe(...) + send(...)

normalizeConnectionAdapter(...) internally adapts legacy connect() adapters into subscribe/send behavior so existing integrations continue to work.

This means no breaking API migration is required for existing users.


What has been implemented

packages/typescript/ai-client

  • Added ConnectionStatus type and exports
  • Added subscription lifecycle state to ChatClient
  • Added new client methods:
    • subscribe
    • unsubscribe
    • getIsSubscribed
    • getConnectionStatus
  • Added lifecycle callbacks:
    • onSubscriptionChange
    • onConnectionStatusChange
    • onSessionGeneratingChange
  • Added sessionGenerating state derived from stream run events:
    • Tracks active runs via Set<runId> to correctly handle concurrent generations
    • RUN_STARTED adds to active set → sessionGenerating = true
    • RUN_FINISHED / RUN_ERROR removes from set → sessionGenerating = false only when all runs complete
    • RUN_ERROR without runId (session-level error) clears all active runs
    • Resets on unsubscribe(), connection replacement, and fatal subscription loop errors
    • Callback deduplication: only fires on actual state transitions
  • Refined stream cancellation internals so request cancellation and subscription cancellation are independently controlled
  • Updated stop() to preserve active subscription
  • Updated updateOptions({ connection }) to perform safe teardown/reset/re-subscribe
  • Added/expanded tests for lifecycle transitions, idempotency, restart behavior, callback emissions, error paths, concurrent runs, and session-level errors
  • Added integration tests for concurrent-run message integrity and reconnect dedupe

packages/typescript/ai (StreamProcessor)

  • Added activeRuns: Set<string> run tracking for concurrent run safety
  • RUN_STARTED now registers the run (previously ignored)
  • RUN_FINISHED only finalizes when all active runs complete (was: finalize globally on every RUN_FINISHED)
  • RUN_ERROR removes the errored run from tracking (with runId) or clears all (without)
  • ensureAssistantMessage() now hydrates state from existing messages on reconnect instead of creating duplicates
  • Added 11 tests covering concurrent runs (6) and reconnect dedupe (5)

Framework wrappers

Implemented parity across:

  • @tanstack/ai-react
  • @tanstack/ai-preact
  • @tanstack/ai-solid
  • @tanstack/ai-vue
  • @tanstack/ai-svelte

Changes include:

  • live?: boolean option
  • Returned reactive state:
    • isSubscribed
    • connectionStatus
    • sessionGenerating
  • Internal wiring of new lifecycle callbacks (including onSessionGeneratingChange)
  • Mount/unmount behavior for live mode

Usage examples

Example A: Explicit live lifecycle with subscribe/send adapter

import { ChatClient } from '@tanstack/ai-client'

const client = new ChatClient({
  connection, // subscribe/send adapter
  onSubscriptionChange: (v) => console.log('subscribed:', v),
  onConnectionStatusChange: (s) => console.log('connection:', s),
  onSessionGeneratingChange: (g) => console.log('generating:', g),
})

client.subscribe()
// isSubscribed: true
// connectionStatus: connecting -> connected (after first chunk)

await client.sendMessage('Hello')
// sessionGenerating: true (on RUN_STARTED) -> false (on RUN_FINISHED)
// request finishes, subscription remains active

client.stop()
// stops only local request activity; connection remains subscribed
// sessionGenerating follows stream truth — if a RUN_FINISHED arrives, it flips to false

client.unsubscribe()
// isSubscribed: false
// connectionStatus: disconnected
// sessionGenerating: false (reset)

Example B: Legacy connect() adapter still works

import { ChatClient, fetchServerSentEvents } from '@tanstack/ai-client'

const client = new ChatClient({
  connection: fetchServerSentEvents('/api/chat'), // connect() adapter
})

// You can still use normal request APIs:
await client.sendMessage('Hello')

// And now you can also explicitly manage connection lifecycle:
client.subscribe()
console.log(client.getIsSubscribed()) // true
console.log(client.getConnectionStatus()) // connecting/connected

Example C: React hook with automatic live lifecycle

import { useChat } from '@tanstack/ai-react'

function ChatScreen() {
  const {
    messages,
    sendMessage,
    isLoading,
    status,
    isSubscribed,
    connectionStatus,
    sessionGenerating,
  } = useChat({
    connection,
    live: true,
  })

  return (
    <div>
      <p>request status: {status}</p>
      <p>connection: {connectionStatus}</p>
      <p>subscribed: {String(isSubscribed)}</p>
      {/* Show typing indicator for any subscriber, not just the sender */}
      {sessionGenerating && <p>Assistant is typing...</p>}
      <button disabled={isLoading} onClick={() => sendMessage('Hi')}>
        Send
      </button>
      <pre>{JSON.stringify(messages, null, 2)}</pre>
    </div>
  )
}

Migration notes

  • Existing connect() adapter users are backward compatible.
  • Existing request APIs remain unchanged.
  • Consumers that need durable/live UX should:
    • use subscribe() / unsubscribe() directly, or
    • set live: true in framework hooks.
  • If you previously relied on stop() to fully disconnect, switch that logic to unsubscribe().

Stream processor correctness fixes

This PR also fixes two live-session correctness bugs visible under concurrent runs and reconnect-mid-run scenarios.

Problem A: Concurrent prompts corrupt transient UI

When two runs are active concurrently (e.g. two windows on the same chat session), RUN_FINISHED for one run called finalizeStream() globally — clearing activeMessageIds, force-completing all tool calls, and firing onStreamEnd. This killed the still-active run's in-flight state.

Problem B: Refresh during active generation causes transient double-streaming

On reconnect (e.g. page refresh during generation), TEXT_MESSAGE_CONTENT could arrive for a message that already exists in initialMessages/MESSAGES_SNAPSHOT. The processor's ensureAssistantMessage() created a duplicate message instead of binding to the existing one, causing the assistant text to appear twice transiently.

Fix A: Run-aware finalization (StreamProcessor)

  • Added activeRuns: Set<string> to track concurrent runs by runId.
  • Promoted RUN_STARTED from the ignored default case to a proper handler that registers the run.
  • handleRunFinishedEvent now removes the run from activeRuns and only finalizes when activeRuns.size === 0. One run finishing no longer kills another concurrent run's messages, tool calls, or stream state.
  • handleRunErrorEvent removes the errored run (or clears all runs if no runId).
  • Backward compatible: flows without RUN_STARTED still finalize normally.

Fix B: Reconnect dedupe / state hydration (StreamProcessor.ensureAssistantMessage)

  • Before creating a new message, ensureAssistantMessage now checks if a message with the given preferredId already exists in this.messages.
  • If found, it hydrates transient MessageStreamState from the existing message instead of duplicating it.
  • currentSegmentText and lastEmittedText are seeded from the existing message's last text part so incoming deltas append correctly (e.g. existing "Hello wor" + delta "ld!""Hello world!").
  • Does not fire onStreamStart for hydration (message already exists in UI).

Tests added

stream-processor.test.ts — 11 new tests:

  • Concurrent runs: RUN_FINISHED for one run doesn't finalize another, isDone stays false until last run, single-run backward compat, RUN_FINISHED without RUN_STARTED backward compat, tool calls not force-completed across runs, RUN_ERROR isolation
  • Reconnect dedupe: no duplicate on TEXT_MESSAGE_CONTENT for existing message, multi-delta append, tool-call-then-text resume, MESSAGES_SNAPSHOT + resume, no spurious onStreamStart

chat-client.test.ts — 2 new integration tests:

  • Concurrent runs via subscribe adapter: interleaved runs with stable message content
  • Reconnect with initialMessages: no duplicate message on resumed content

Residual limitations

  • No per-run message correlation in the protocol. AG-UI events don't carry runId on content events — only on RUN_STARTED/RUN_FINISHED. The processor defers all finalization until the last run completes. A protocol extension could enable finer-grained per-run cleanup.
  • hasError is global. If one concurrent run errors, the flag affects whitespace-only-message cleanup for all runs. Per-run error tracking is follow-up work.
  • Reconnect state hydration is heuristic. currentSegmentText is seeded from the existing message's last text part. Correct server behavior (proper deltas or MESSAGES_SNAPSHOT first) is assumed.

Summary by CodeRabbit

  • New Features

    • Live subscription mode with subscribe()/unsubscribe(), plus new public state: isSubscribed, connectionStatus, sessionGenerating.
    • New MESSAGES_SNAPSHOT event for batch message snapshots.
    • Framework bindings (React/Preact/Solid/Svelte/Vue) add live option and expose subscription/session state.
  • Improvements

    • More robust streaming: durable subscribe/send support, improved reconnects, concurrent-run handling, and reduced duplicate/resume content.
  • Tests

    • Expanded coverage for subscription lifecycle, reconnection, per-message streaming, and multi-run scenarios.

thruflo and others added 26 commits February 10, 2026 11:44
Widen TextMessageStartEvent.role to accept all message roles and add
optional parentMessageId to ToolCallStartEvent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace single-message instance variables with a Map<string, MessageStreamState>
keyed by messageId. Add explicit handlers for TEXT_MESSAGE_START, TEXT_MESSAGE_END,
and STATE_SNAPSHOT events. Route tool calls via toolCallToMessage mapping.

Maintains backward compat: startAssistantMessage() sets pendingManualMessageId
which TEXT_MESSAGE_START associates with. ensureAssistantMessage() auto-creates
state for streams without TEXT_MESSAGE_START.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add MessagesSnapshotEvent as a first-class AG-UI event type for
conversation hydration. Replace the previous STATE_SNAPSHOT handler
(which extracted messages from arbitrary state) with a dedicated
MESSAGES_SNAPSHOT handler that accepts a typed messages array.

- Add MessagesSnapshotEvent type to AGUIEventType and AGUIEvent unions
- Add MESSAGES_SNAPSHOT case in StreamProcessor.processChunk()
- Remove STATE_SNAPSHOT handler (falls through to default no-op)
- Fix onStreamEnd to fire per-message (not only when no active messages remain)
- Fix getActiveAssistantMessageId to return on first reverse match
- Fix ensureAssistantMessage to emit onStreamStart and onMessagesChange
- Add proposal docs for resumeable session support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…on model

Replace direct ConnectionAdapter usage in ChatClient with a SessionAdapter-based
subscription loop. When only a ConnectionAdapter is provided, it is wrapped in a
DefaultSessionAdapter internally. This enables persistent session support while
preserving existing timing semantics and backwards compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…unter

reload() now cancels the active stream (abort controllers, subscription,
processing promise) before starting a new one. A stream generation counter
prevents a superseded stream's async cleanup from clobbering the new
stream's state (abortController, isLoading, processor).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Guard against double onStreamEnd when RUN_FINISHED arrives before TEXT_MESSAGE_END
- Clear dead waiters on subscribe exit to prevent chunk loss on reconnection
- Reset transient processor state (messageStates, activeMessageIds, etc.) on MESSAGES_SNAPSHOT
- Remove optimistic startAssistantMessage() from streamResponse(); let stream events create the message naturally via TEXT_MESSAGE_START or ensureAssistantMessage()
- Clean up abort listeners on normal waiter resolution to prevent listener accumulation
- Make handleStepFinishedEvent use ensureAssistantMessage() for backward compat with streams that lack TEXT_MESSAGE_START

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r race

Reset processor stream state (prepareAssistantMessage) in streamResponse()
before the subscription loop, preventing stale messageStates from blocking
new assistant message creation on reload.

Rewrite createDefaultSession with per-subscribe queue isolation: each
subscribe() synchronously installs fresh buffer/waiters, drains pre-buffered
chunks via splice(0), and removes async cleanup that raced with new
subscription cycles.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove unnecessary `chunk.delta !== undefined` condition (delta is
always a string on TextMessageContentEvent) and remove redundant `!`
non-null assertion inside an already-narrowed `if` block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix import ordering: move value import `createDefaultSession` above
type-only imports. Convert shorthand method signatures to function
property style in the SessionAdapter interface.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wrap createDefaultSession's send() in try/catch and push a RUN_ERROR
AG-UI event to the queue before re-throwing, so subscribe() consumers
learn about connection failures through the standard protocol. Also
resolve processingResolve on RUN_ERROR in consumeSubscription (same as
RUN_FINISHED) to prevent hangs.

Tests updated: error assertions now check message content rather than
referential identity, since errors flowing through RUN_ERROR create
new Error instances from the message string.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The stream processor mapped 'tool' to 'assistant' for UIMessage but
stored the raw 'tool' role in MessageStreamState. This caused
getActiveAssistantMessageId() and getCurrentAssistantMessageId() to miss
tool-role messages, so subsequent stream events couldn't attach to the
existing message. Now the uiRole mapping is applied consistently across
all three cases in handleTextMessageStartEvent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When chunk.delta was undefined, the check `chunk.delta !== ''` evaluated
to true, causing "undefined" to be concatenated into nextText. Use
`chunk.delta ?? ''` to normalize before comparison, matching the safe
pattern already used in handleToolCallArgsEvent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The no-unnecessary-condition rule flags ?? since TypeScript types delta
as string. Using || preserves runtime safety and matches existing patterns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
handleMessagesSnapshotEvent was clearing maps but not resetting isDone,
hasError, and finishReason. Use resetStreamState() which handles all of
these, ensuring finalizeStream() sees fresh state after a snapshot.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Squash all post-groundwork changes into a single commit that completes the connection adapter unification, stream lifecycle hardening, and restoration of ai stream snapshot/state behavior.

Co-authored-by: Cursor <cursoragent@cursor.com>
…on-support

Made-with: Cursor

# Conflicts:
#	packages/typescript/ai-client/src/chat-client.ts
#	packages/typescript/ai-client/tests/chat-client.test.ts
Keep early stream exits aligned with the Promise<boolean> contract so repo-wide type checks pass after the merge from main.

Made-with: Cursor
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 9, 2026

📝 Walkthrough

Walkthrough

Adds a subscription-based streaming model: new SubscribeConnectionAdapter and normalizeConnectionAdapter, subscription lifecycle (subscribe/unsubscribe), connectionStatus/sessionGenerating tracking, background subscription loop in ChatClient, and per-message StreamProcessor refactor to support concurrent runs and message-scoped state.

Changes

Cohort / File(s) Summary
Chat client core
packages/typescript/ai-client/src/chat-client.ts, packages/typescript/ai-client/src/connection-adapters.ts
Introduce SubscribeConnectionAdapter, normalizeConnectionAdapter; change connection typing to support connect/subscribe modes; implement subscribe/unsubscribe lifecycle, background subscription loop, connectionStatus/isSubscribed/sessionGenerating state and lifecycle helpers.
Public types & exports
packages/typescript/ai-client/src/index.ts, packages/typescript/ai-client/src/types.ts
Export ConnectionStatus, ConnectConnectionAdapter, SubscribeConnectionAdapter; extend ChatClientOptions with onSubscriptionChange, onConnectionStatusChange, onSessionGeneratingChange.
Stream processor (core)
packages/typescript/ai/src/activities/chat/stream/processor.ts, packages/typescript/ai/src/activities/chat/stream/types.ts, packages/typescript/ai/src/types.ts
Refactor to per-message MessageStreamState; add tracking for active runs, per-message tool calls, MESSAGES_SNAPSHOT event, and startAssistantMessage(messageId?: string); remove global currentAssistantMessageId.
Clients & generation APIs
packages/typescript/ai-client/src/generation-client.ts, .../video-generation-client.ts, various use-* / create-* files across react/preact/solid/svelte/vue`
Rename connection-type usage to ConnectConnectionAdapter for connect-mode APIs and update signatures/JSDoc across generation/media hooks and components.
Framework bindings
packages/typescript/ai-preact/..., packages/typescript/ai-react/..., packages/typescript/ai-solid/..., packages/typescript/ai-svelte/..., packages/typescript/ai-vue/...
Expose isSubscribed, connectionStatus, sessionGenerating in hooks/adapters; add live option to auto-subscribe; wire ChatClient callbacks into framework state; update tests and test-utils.
Tests - ai-client & adapters
packages/typescript/ai-client/tests/chat-client.test.ts, .../connection-adapters.test.ts, .../test-utils.ts
Add extensive tests for subscribe/send lifecycle, connection status transitions, error/run concurrency, and normalizeConnectionAdapter behavior; update mocks to ConnectConnectionAdapter.
Tests - stream processor
packages/typescript/ai/tests/stream-processor.test.ts
Expand coverage for per-message streaming, TEXT_MESSAGE_START/END semantics, MESSAGES_SNAPSHOT, tool-call interleaving, reconnection/dedupe and finalize logic.
Tests - framework bindings
framework tests across react/preact/solid/svelte/vue (tests/*)
Update tests to assert initial subscription state, live auto-subscribe behavior, and sessionGenerating transitions; update test-utils to surface new state fields.
Changelogs / changesets
.changeset/*.md
Document subscription lifecycle, generation transport changes, and stream-processor improvements.

Sequence Diagram(s)

sequenceDiagram
    participant UI as UI/App
    participant Client as ChatClient
    participant Adapter as SubscribeConnectionAdapter
    participant Processor as StreamProcessor

    UI->>Client: subscribe()
    activate Client
    Client->>Client: setIsSubscribed(true)\nsetConnectionStatus('connecting')
    Client->>Adapter: subscribe(abortSignal)
    activate Adapter
    Adapter-->>Client: stream chunks (AsyncIterable)
    loop per chunk
        Client->>Processor: processChunk(chunk)
        activate Processor
        alt RUN_STARTED
            Processor-->>Client: onSessionGeneratingChange(true)
        else TEXT_MESSAGE_CONTENT
            Processor-->>Client: onMessagesChange()
        else RUN_FINISHED/RUN_ERROR
            Processor-->>Client: onSessionGeneratingChange(false)\nonMessagesChange()
        end
        deactivate Processor
    end
    Client->>Client: setConnectionStatus('connected')
    UI->>Client: unsubscribe()
    Client->>Client: cancelInFlightStream()\nsetIsSubscribed(false)
    Client->>Adapter: abort()
    deactivate Adapter
    deactivate Client
Loading
sequenceDiagram
    participant Processor as StreamProcessor
    participant MsgState as MessageStreamState
    participant EventBus as Event Emitter

    Processor->>MsgState: TEXT_MESSAGE_START(messageId?)
    Processor->>MsgState: createMessageState(messageId)
    loop per run/message
        Processor->>MsgState: RUN_STARTED(runId)
        Processor->>EventBus: emit sessionGenerating(true)
        Processor->>MsgState: TEXT_MESSAGE_CONTENT -> append segment\nemit messagesChange
        Processor->>MsgState: TOOL_CALL_START/ARGS/END -> track per-message tool call
        Processor->>EventBus: emit tool-call/updates
        Processor->>MsgState: RUN_FINISHED/RUN_ERROR -> finalize run
        Processor->>EventBus: emit sessionGenerating(false)
    end
    Processor->>EventBus: finalizeStream() -> onStreamEnd()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 I hopped a new subscription trail, background streams awake,

Connections hum from 'connecting' to 'connected' with each take,
Per-message carrots, tool-call nests, runs racing side by side,
I nibble dupes and stitch the rest — the chat now runs with pride!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 64.71% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: introducing a subscription lifecycle amendment to the AI client.
Description check ✅ Passed PR description is comprehensive, well-structured, and clearly communicates the changes, rationale, design overview, implementation details, usage examples, migration notes, and correctness fixes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/typescript/ai-preact/src/use-chat.ts (1)

58-94: ⚠️ Potential issue | 🟠 Major

Forward onCustomEvent through the Preact wrapper.

UseChatOptions still accepts onCustomEvent, but this constructor never passes it to ChatClient, so custom tool events are silently dropped in Preact while the React/Solid/Svelte bindings forward them.

🔧 Proposed fix
       onError: (err) => {
         optionsRef.current.onError?.(err)
       },
       tools: optionsRef.current.tools,
+      onCustomEvent: (eventType, data, context) => {
+        optionsRef.current.onCustomEvent?.(eventType, data, context)
+      },
       streamProcessor: options.streamProcessor,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-preact/src/use-chat.ts` around lines 58 - 94, The
ChatClient instantiation in useChat does not forward the onCustomEvent handler
from UseChatOptions, so custom tool events are dropped; update the new
ChatClient({...}) call to include onCustomEvent:
optionsRef.current.onCustomEvent (or options.onCustomEvent if you prefer
consistent non-ref usage) alongside the other callbacks so the ChatClient
receives and can emit custom events; look for the ChatClient constructor call
and add the onCustomEvent property matching the pattern used for
onFinish/onError/onChunk.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/typescript/ai-client/tests/chat-client.test.ts`:
- Around line 1278-1294: The test's connection.subscribe is defined as an async
generator that immediately throws, triggering the "generator without yield"
lint; update the subscribe implementation in the 'should surface subscription
loop failures without hanging' test (the connection.subscribe used to construct
ChatClient) to include a dummy yield (or otherwise return a value) before
throwing so the function is a valid generator (same fix pattern applied earlier
around lines 295-297): add a single yield statement prior to the throw so the
generator lint is satisfied while preserving the thrown Error('subscription
exploded') behavior.
- Around line 293-307: The test's inline generator assigned to the
connection.subscribe property throws before any yield, triggering require-yield
lint errors; update the mock to expose an async iterator instead (e.g., make
subscribe return an object whose async *[Symbol.asyncIterator]() throws) or
change the generator to yield once then throw so linters are satisfied, keeping
the call site that constructs new ChatClient({ connection }) and assertions
against client.getIsSubscribed() and client.getConnectionStatus() unchanged.

In `@packages/typescript/ai-react/tests/use-chat.test.ts`:
- Around line 1581-1655: The tests in use-chat.test.ts are only asserting the
final sessionGenerating=false and miss verifying the RUN_STARTED transition;
update both tests that use renderUseChat, the adapter.subscribe generator, and
result.current.sendMessage to emit RUN_STARTED after sendMessage is called (or
pause the generator until sendMessage) so you can assert
result.current.sessionGenerating becomes true, then assert it transitions back
to false (and isLoading false in the second test); specifically modify the
mocked subscribe flow used in the two cases and add assertions on
result.current.sessionGenerating === true immediately after sendMessage, then
waitFor it to become false.

In `@packages/typescript/ai-solid/src/use-chat.ts`:
- Around line 102-107: Remove the eager subscribe/unsubscribe block that runs
during hook setup: delete the conditional that calls client().subscribe() /
client().unsubscribe(). Instead rely on the existing createEffect (which already
applies the initial options.live state on mount) to perform connection side
effects inside Solid's lifecycle; ensure no other setup code outside
createEffect triggers client().subscribe()/unsubscribe().

In `@packages/typescript/ai-svelte/src/create-chat.svelte.ts`:
- Around line 98-100: The factory createChat currently calls client.subscribe()
when options.live is true but only returns stop(); extend the public API to
expose a teardown method (e.g., unsubscribe or teardownLive) that calls the chat
client's unsubscribe() to properly close the durable subscription, update the
returned object from createChat to include this method, and update the matching
declaration in packages/typescript/ai-svelte/src/types.ts so consumers can call
it (keep stop() for request-local cancellation and use
unsubscribe()/teardownLive for live-subscription teardown).

---

Outside diff comments:
In `@packages/typescript/ai-preact/src/use-chat.ts`:
- Around line 58-94: The ChatClient instantiation in useChat does not forward
the onCustomEvent handler from UseChatOptions, so custom tool events are
dropped; update the new ChatClient({...}) call to include onCustomEvent:
optionsRef.current.onCustomEvent (or options.onCustomEvent if you prefer
consistent non-ref usage) alongside the other callbacks so the ChatClient
receives and can emit custom events; look for the ChatClient constructor call
and add the onCustomEvent property matching the pattern used for
onFinish/onError/onChunk.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6db8c244-36cd-4dac-a86b-0fab7d1cddc6

📥 Commits

Reviewing files that changed from the base of the PR and between 572323b and 1a0f3d7.

📒 Files selected for processing (28)
  • packages/typescript/ai-client/src/chat-client.ts
  • packages/typescript/ai-client/src/connection-adapters.ts
  • packages/typescript/ai-client/src/index.ts
  • packages/typescript/ai-client/src/types.ts
  • packages/typescript/ai-client/tests/chat-client.test.ts
  • packages/typescript/ai-client/tests/connection-adapters.test.ts
  • packages/typescript/ai-client/tests/test-utils.ts
  • packages/typescript/ai-preact/src/types.ts
  • packages/typescript/ai-preact/src/use-chat.ts
  • packages/typescript/ai-preact/tests/use-chat.test.ts
  • packages/typescript/ai-react/src/types.ts
  • packages/typescript/ai-react/src/use-chat.ts
  • packages/typescript/ai-react/tests/use-chat.test.ts
  • packages/typescript/ai-solid/src/types.ts
  • packages/typescript/ai-solid/src/use-chat.ts
  • packages/typescript/ai-solid/tests/test-utils.ts
  • packages/typescript/ai-solid/tests/use-chat.test.ts
  • packages/typescript/ai-svelte/src/create-chat.svelte.ts
  • packages/typescript/ai-svelte/src/types.ts
  • packages/typescript/ai-svelte/tests/use-chat.test.ts
  • packages/typescript/ai-vue/src/types.ts
  • packages/typescript/ai-vue/src/use-chat.ts
  • packages/typescript/ai-vue/tests/test-utils.ts
  • packages/typescript/ai-vue/tests/use-chat.test.ts
  • packages/typescript/ai/src/activities/chat/stream/processor.ts
  • packages/typescript/ai/src/activities/chat/stream/types.ts
  • packages/typescript/ai/src/types.ts
  • packages/typescript/ai/tests/stream-processor.test.ts

Comment on lines +293 to +307
it('should expose connectionStatus error for subscription loop failures', async () => {
const connection = {
subscribe: async function* () {
throw new Error('subscription failed')
},
send: async () => {},
}
const client = new ChatClient({ connection })

client.subscribe()
await new Promise((resolve) => setTimeout(resolve, 0))

expect(client.getIsSubscribed()).toBe(false)
expect(client.getConnectionStatus()).toBe('error')
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Generator function throws without yielding — consider alternative pattern.

Static analysis flags lines 295-297: the generator throws immediately without ever yielding. While this correctly tests subscription loop failure handling, it triggers lint errors (require-yield, useYield).

Consider an alternative that satisfies linters while preserving test behavior:

Suggested fix
     it('should expose connectionStatus error for subscription loop failures', async () => {
       const connection = {
-        subscribe: async function* () {
-          throw new Error('subscription failed')
-        },
+        subscribe: async function* () {
+          yield* (async function* () {
+            throw new Error('subscription failed')
+          })()
+        },
         send: async () => {},
       }

Or use a non-generator approach:

subscribe: () => ({
  async *[Symbol.asyncIterator]() {
    throw new Error('subscription failed')
  }
})
🧰 Tools
🪛 Biome (2.4.4)

[error] 295-297: This generator function doesn't contain yield.

(lint/correctness/useYield)

🪛 ESLint

[error] 295-297: This generator function does not have 'yield'.

(require-yield)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/tests/chat-client.test.ts` around lines 293 -
307, The test's inline generator assigned to the connection.subscribe property
throws before any yield, triggering require-yield lint errors; update the mock
to expose an async iterator instead (e.g., make subscribe return an object whose
async *[Symbol.asyncIterator]() throws) or change the generator to yield once
then throw so linters are satisfied, keeping the call site that constructs new
ChatClient({ connection }) and assertions against client.getIsSubscribed() and
client.getConnectionStatus() unchanged.

Comment on lines +1278 to +1294
it('should surface subscription loop failures without hanging', async () => {
const connection = {
subscribe: async function* () {
throw new Error('subscription exploded')
},
send: async () => {},
}
const onError = vi.fn()
const client = new ChatClient({ connection, onError })

await client.sendMessage('Hello')

expect(onError).toHaveBeenCalledTimes(1)
expect(onError.mock.calls[0]?.[0]).toBeInstanceOf(Error)
expect(onError.mock.calls[0]?.[0].message).toBe('subscription exploded')
expect(client.getStatus()).toBe('error')
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same lint issue: generator without yield.

Lines 1280-1282 have the same static analysis warning as lines 295-297. Apply the same fix pattern here.

🧰 Tools
🪛 Biome (2.4.4)

[error] 1280-1282: This generator function doesn't contain yield.

(lint/correctness/useYield)

🪛 ESLint

[error] 1280-1282: This generator function does not have 'yield'.

(require-yield)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/tests/chat-client.test.ts` around lines 1278 -
1294, The test's connection.subscribe is defined as an async generator that
immediately throws, triggering the "generator without yield" lint; update the
subscribe implementation in the 'should surface subscription loop failures
without hanging' test (the connection.subscribe used to construct ChatClient) to
include a dummy yield (or otherwise return a value) before throwing so the
function is a valid generator (same fix pattern applied earlier around lines
295-297): add a single yield statement prior to the throw so the generator lint
is satisfied while preserving the thrown Error('subscription exploded')
behavior.

Comment on lines +1581 to +1655
describe('sessionGenerating', () => {
it('should expose sessionGenerating and update from stream run events', async () => {
const adapter: import('@tanstack/ai-client').SubscribeConnectionAdapter = {
subscribe: async function* (signal?: AbortSignal) {
yield {
type: 'RUN_STARTED' as const,
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
}
yield {
type: 'TEXT_MESSAGE_CONTENT' as const,
messageId: 'msg-1',
model: 'test',
timestamp: Date.now(),
delta: 'Hi',
content: 'Hi',
}
yield {
type: 'RUN_FINISHED' as const,
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop' as const,
}
},
send: vi.fn(async () => {}),
}

const { result } = renderUseChat({ connection: adapter, live: true })

await waitFor(() => {
expect(result.current.isSubscribed).toBe(true)
})

await result.current.sendMessage('Hello')

await waitFor(() => {
expect(result.current.sessionGenerating).toBe(false)
})
})

it('should integrate correctly with live subscription lifecycle', async () => {
const adapter: import('@tanstack/ai-client').SubscribeConnectionAdapter = {
subscribe: async function* () {
yield {
type: 'RUN_STARTED' as const,
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
}
yield {
type: 'RUN_FINISHED' as const,
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop' as const,
}
},
send: vi.fn(async () => {}),
}

const { result } = renderUseChat({ connection: adapter, live: true })

await waitFor(() => {
expect(result.current.isSubscribed).toBe(true)
})

await result.current.sendMessage('Hello')

await waitFor(() => {
expect(result.current.sessionGenerating).toBe(false)
expect(result.current.isLoading).toBe(false)
})
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert the sessionGenerating transition, not just the final reset.

Both tests only check the terminal false state, and the mocked subscribe() emits RUN_STARTED/RUN_FINISHED before Line 1616 / Line 1649 calls sendMessage(). A regression where RUN_STARTED is ignored would still pass here, so this block doesn’t actually verify the lifecycle it claims to cover. Please sequence the stream around sendMessage() and assert sessionGenerating flips to true before it returns to false.

🧪 Example of tightening the test
+ let releaseRun!: () => void
+ const runStarted = new Promise<void>((resolve) => {
+   releaseRun = resolve
+ })

  const adapter: import('@tanstack/ai-client').SubscribeConnectionAdapter = {
    subscribe: async function* () {
+     await runStarted
      yield {
        type: 'RUN_STARTED' as const,
        runId: 'run-1',
        model: 'test',
        timestamp: Date.now(),
      }
      yield {
        type: 'RUN_FINISHED' as const,
        runId: 'run-1',
        model: 'test',
        timestamp: Date.now(),
        finishReason: 'stop' as const,
      }
    },
-   send: vi.fn(async () => {}),
+   send: vi.fn(async () => {
+     releaseRun()
+   }),
  }

  const { result } = renderUseChat({ connection: adapter, live: true })

- await result.current.sendMessage('Hello')
+ const sendPromise = result.current.sendMessage('Hello')
+
+ await waitFor(() => {
+   expect(result.current.sessionGenerating).toBe(true)
+ })
+
+ await sendPromise

  await waitFor(() => {
    expect(result.current.sessionGenerating).toBe(false)
  })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-react/tests/use-chat.test.ts` around lines 1581 -
1655, The tests in use-chat.test.ts are only asserting the final
sessionGenerating=false and miss verifying the RUN_STARTED transition; update
both tests that use renderUseChat, the adapter.subscribe generator, and
result.current.sendMessage to emit RUN_STARTED after sendMessage is called (or
pause the generator until sendMessage) so you can assert
result.current.sessionGenerating becomes true, then assert it transitions back
to false (and isLoading false in the second test); specifically modify the
mocked subscribe flow used in the two cases and add assertions on
result.current.sessionGenerating === true immediately after sendMessage, then
waitFor it to become false.

Comment on lines +102 to +107
// Apply initial live mode immediately on hook creation.
if (options.live) {
client().subscribe()
} else {
client().unsubscribe()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "use-chat.ts" | head -5

Repository: TanStack/ai

Length of output: 241


🏁 Script executed:

cat -n packages/typescript/ai-solid/src/use-chat.ts | sed -n '95,120p'

Repository: TanStack/ai

Length of output: 900


🏁 Script executed:

cat -n packages/typescript/ai-solid/src/use-chat.ts | head -50

Repository: TanStack/ai

Length of output: 2184


🏁 Script executed:

fd -e ts -e tsx "types.ts|types.d.ts" packages/typescript/ai-solid/src | xargs grep -l "UseChatOptions" 2>/dev/null

Repository: TanStack/ai

Length of output: 96


🏁 Script executed:

cat packages/typescript/ai-solid/src/types.ts

Repository: TanStack/ai

Length of output: 3676


🏁 Script executed:

cat -n packages/typescript/ai-solid/src/use-chat.ts | sed -n '90,130p'

Repository: TanStack/ai

Length of output: 1630


Remove the eager subscribe/unsubscribe from setup.

The createEffect below already applies the initial live state on mount. The eager block duplicates this logic and executes connection side effects during hook setup rather than within Solid's lifecycle system, violating the framework's composable patterns.

🔧 Proposed fix
-  // Apply initial live mode immediately on hook creation.
-  if (options.live) {
-    client().subscribe()
-  } else {
-    client().unsubscribe()
-  }
-
   createEffect(() => {
     if (options.live) {
       client().subscribe()
     } else {
       client().unsubscribe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-solid/src/use-chat.ts` around lines 102 - 107, Remove
the eager subscribe/unsubscribe block that runs during hook setup: delete the
conditional that calls client().subscribe() / client().unsubscribe(). Instead
rely on the existing createEffect (which already applies the initial
options.live state on mount) to perform connection side effects inside Solid's
lifecycle; ensure no other setup code outside createEffect triggers
client().subscribe()/unsubscribe().

Comment on lines +98 to +100
if (options.live) {
client.subscribe()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Expose a teardown method for live subscriptions.

createChat({ live: true }) now starts the durable subscription loop, but this factory still only gives callers stop(). In packages/typescript/ai-client/src/chat-client.ts, stop() cancels request-local work; only unsubscribe() tears down the live subscription, so Svelte consumers have no way to release the connection on destroy.

🔧 Proposed fix
+  const unsubscribe = () => {
+    client.unsubscribe()
+  }
+
   const stop = () => {
     client.stop()
   }
@@
     reload,
     stop,
+    unsubscribe,
     setMessages,

packages/typescript/ai-svelte/src/types.ts will need the matching public method as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-svelte/src/create-chat.svelte.ts` around lines 98 -
100, The factory createChat currently calls client.subscribe() when options.live
is true but only returns stop(); extend the public API to expose a teardown
method (e.g., unsubscribe or teardownLive) that calls the chat client's
unsubscribe() to properly close the durable subscription, update the returned
object from createChat to include this method, and update the matching
declaration in packages/typescript/ai-svelte/src/types.ts so consumers can call
it (keep stop() for request-local cancellation and use
unsubscribe()/teardownLive for live-subscription teardown).

…on-support

Bring main into the durable session branch and resolve the generation adapter fallout by keeping ChatClient's subscribe/send support while narrowing one-shot generation APIs to connect-based adapters.

Made-with: Cursor
@samwillis samwillis force-pushed the samwillis/sub-lifecycle branch from 1a0f3d7 to b8ae243 Compare March 13, 2026 19:51
@changeset-bot
Copy link

changeset-bot bot commented Mar 13, 2026

🦋 Changeset detected

Latest commit: 16f983c

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 29 packages
Name Type
@tanstack/ai Patch
@tanstack/ai-client Patch
@tanstack/ai-preact Patch
@tanstack/ai-react Patch
@tanstack/ai-solid Patch
@tanstack/ai-svelte Patch
@tanstack/ai-vue Patch
@tanstack/ai-anthropic Patch
@tanstack/ai-devtools-core Patch
@tanstack/ai-elevenlabs Patch
@tanstack/ai-event-client Patch
@tanstack/ai-fal Patch
@tanstack/ai-gemini Patch
@tanstack/ai-grok Patch
@tanstack/ai-groq Patch
@tanstack/ai-ollama Patch
@tanstack/ai-openai Patch
@tanstack/ai-openrouter Patch
@tanstack/tests-adapters Patch
@tanstack/smoke-tests-e2e Patch
ts-svelte-chat Patch
ts-vue-chat Patch
@tanstack/ai-react-ui Patch
@tanstack/ai-solid-ui Patch
vanilla-chat Patch
@tanstack/ai-vue-ui Patch
@tanstack/preact-ai-devtools Patch
@tanstack/react-ai-devtools Patch
@tanstack/solid-ai-devtools Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Document the patch releases for the durable subscribe/send chat transport, generation client wrappers, and core stream processing changes included on this branch.

Made-with: Cursor
Add first-class `sessionGenerating` state to ChatClient and all framework
hooks, derived from stream run events (RUN_STARTED/RUN_FINISHED/RUN_ERROR).

Unlike `isLoading` (request-local), `sessionGenerating` reflects shared
generation activity visible to all subscribers, enabling typing indicators
for non-sender tabs/devices/users. Concurrent runs are tracked via a
Set<runId> so the flag stays true until the last active run completes.

Packages updated: ai-client, ai-react, ai-preact, ai-solid, ai-vue,
ai-svelte — with tests covering concurrent runs, lifecycle resets, and
callback deduplication.

Made-with: Cursor
RUN_FINISHED was finalizing all active messages globally, corrupting
concurrent runs. Now tracks active runs by runId and only finalizes
when the last run completes.

Also fixes reconnect/resume: when TEXT_MESSAGE_CONTENT arrives for a
message that already exists (from initialMessages or MESSAGES_SNAPSHOT),
the processor now hydrates transient state from the existing message
instead of creating a duplicate. Segment text is seeded from the
existing last text part so deltas append correctly.

Made-with: Cursor
Keep the framework wrapper type definitions lint-clean after the sub-lifecycle rebase so the full repo test suite passes again.

Made-with: Cursor
@samwillis samwillis force-pushed the samwillis/sub-lifecycle branch from b8ae243 to 16f983c Compare March 13, 2026 20:03
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/typescript/ai/src/activities/chat/stream/processor.ts (1)

1117-1150: ⚠️ Potential issue | 🟠 Major

Approval updates should target the tool call’s owning message, not the currently active one.

At Line 1117, approval-requested is applied to getActiveAssistantMessageId(). In multi-message interleaving, that can update the wrong message.

💡 Proposed fix
-    const messageId = this.getActiveAssistantMessageId()
-
     // Handle client tool input availability - trigger client-side execution
     if (chunk.name === 'tool-input-available' && chunk.value) {
@@
     if (chunk.name === 'approval-requested' && chunk.value) {
       const { toolCallId, toolName, input, approval } = chunk.value as {
         toolCallId: string
         toolName: string
         input: any
         approval: { id: string; needsApproval: boolean }
       }
+      const messageId =
+        this.toolCallToMessage.get(toolCallId) ??
+        this.getActiveAssistantMessageId()

       // Update the tool call part with approval state
       if (messageId) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai/src/activities/chat/stream/processor.ts` around lines
1117 - 1150, The approval-requested handler currently uses
getActiveAssistantMessageId(), which can update the wrong message in interleaved
conversations; instead resolve the owning message for the given toolCallId and
pass that message's id into updateToolCallApproval (e.g., call a helper like
getMessageIdForToolCall(toolCallId) or locate the message in this.messages by
matching the toolCallId on tool call parts), and only fall back to
getActiveAssistantMessageId() if no owner is found; then call
updateToolCallApproval(this.messages, owningMessageId, toolCallId, approval) so
the approval state is applied to the correct message.
♻️ Duplicate comments (2)
packages/typescript/ai-solid/src/use-chat.ts (1)

102-107: ⚠️ Potential issue | 🟠 Major

Remove the eager subscribe/unsubscribe from setup.

This block duplicates the logic in the createEffect below (lines 109-115) which already applies the initial live state on mount. Executing connection side effects during hook setup—outside of Solid's lifecycle system—violates the framework's composable patterns and can cause issues with SSR or strict mode.

🔧 Proposed fix
-  // Apply initial live mode immediately on hook creation.
-  if (options.live) {
-    client().subscribe()
-  } else {
-    client().unsubscribe()
-  }
-
   createEffect(() => {
     if (options.live) {
       client().subscribe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-solid/src/use-chat.ts` around lines 102 - 107, Remove
the eager subscribe/unsubscribe call in the hook setup: delete the if
(options.live) { client().subscribe() } else { client().unsubscribe() } block so
connection side-effects are only applied inside the existing createEffect (which
reads options.live and calls client().subscribe()/unsubscribe()); this ensures
client() connection logic runs inside Solid's lifecycle rather than during hook
initialization.
packages/typescript/ai-client/tests/chat-client.test.ts (1)

293-297: ⚠️ Potential issue | 🟡 Minor

Same lint blocker: throw-only async generator mocks.

Line 295 and Line 1280 still define async generators that throw before any yield, so Biome/ESLint will keep failing this file (useYield / require-yield). Return an async iterator object instead, or add a dummy yield before the throw in both tests.

Also applies to: 1280-1282

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/tests/chat-client.test.ts` around lines 293 -
297, The async generator mocks throw before yielding (e.g., the
connection.subscribe async function* in the "should expose connectionStatus
error for subscription loop failures" test and the similar async generator at
1280-1282), which trips the require-yield/useYield lint rules; fix by replacing
those async function* mocks with an async iterator object that rejects from
next() (i.e., return { async next() { return Promise.reject(new Error('...')) },
async return() {...}, async throw() {...} } ) or simply add a dummy yield before
the throw so the generator yields once before throwing; update the mock for
connection.subscribe and the other async generator accordingly.
🧹 Nitpick comments (5)
packages/typescript/ai-react/src/types.ts (1)

6-12: Consider re-exporting ConnectionStatus for consumer convenience.

ConnectionStatus is imported and used in UseChatReturn but not included in the re-exports on line 12. Consumers who need to type their own state or callbacks based on this type would need to import directly from @tanstack/ai-client.

💡 Suggested change
 // Re-export types from ai-client
-export type { ChatRequestBody, MultimodalContent, UIMessage }
+export type { ChatRequestBody, ConnectionStatus, MultimodalContent, UIMessage }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-react/src/types.ts` around lines 6 - 12, The file
imports ConnectionStatus but doesn't re-export it, so add ConnectionStatus to
the exported types to make it available to consumers; update the export
statement in types.ts (the export type line that currently exports
ChatRequestBody, MultimodalContent, UIMessage) to also include ConnectionStatus
so types like UseChatReturn can be fully typed by library users.
packages/typescript/ai-react/src/use-chat.ts (1)

115-122: Guard against unnecessary initial unsubscribe() call to avoid triggering state callbacks on mount.

When options.live is initially undefined or false, the else branch calls client.unsubscribe() on mount. This triggers setIsSubscribed(false) and setConnectionStatus('disconnected') callbacks even though the client is already in that state, causing unnecessary state change notifications.

💡 Suggested change
   useEffect(() => {
     if (options.live) {
       client.subscribe()
-    } else {
+    } else if (options.live === false) {
+      // Only explicitly unsubscribe when live transitions from true to false,
+      // not on initial mount when live is undefined
       client.unsubscribe()
     }
   }, [client, options.live])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-react/src/use-chat.ts` around lines 115 - 122, The
effect calling client.subscribe()/client.unsubscribe() should avoid calling
unsubscribe on mount when the client is already disconnected; update the
useEffect (the block referencing useEffect, options.live, client.subscribe,
client.unsubscribe) to only call client.unsubscribe() when there was a previous
active subscription (e.g., track previous options.live with a ref like
prevLiveRef or check a client.isSubscribed flag) so that on initial mount with
options.live undefined/false you do nothing, but on transitions true→false you
call client.unsubscribe() and trigger setIsSubscribed/setConnectionStatus only
when state actually changes.
packages/typescript/ai-preact/src/types.ts (1)

30-40: Sync the UseChatOptions JSDoc with the new omitted lifecycle callbacks.

The Omit list now excludes onSubscriptionChange, onConnectionStatusChange, and onSessionGeneratingChange, but the comment block still documents only the original four callbacks. Keeping this list complete avoids API confusion.

📝 Suggested doc patch
  * - `onStatusChange` - Managed by Preact state (exposed as `status`)
+ * - `onSubscriptionChange` - Managed by Preact state (exposed as `isSubscribed`)
+ * - `onConnectionStatusChange` - Managed by Preact state (exposed as `connectionStatus`)
+ * - `onSessionGeneratingChange` - Managed by Preact state (exposed as `sessionGenerating`)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-preact/src/types.ts` around lines 30 - 40, Update the
JSDoc for the UseChatOptions type to match the current Omit list: include
documentation entries for the newly omitted lifecycle callbacks
onSubscriptionChange, onConnectionStatusChange, and onSessionGeneratingChange in
addition to the existing onMessagesChange, onLoadingChange, onErrorChange, and
onStatusChange; locate the JSDoc immediately above the UseChatOptions
declaration and add brief descriptions for each callback so the comment
accurately reflects the type's omitted properties.
packages/typescript/ai-svelte/src/create-generate-image.svelte.ts (1)

69-69: Minor: Inconsistent dash character in JSDoc.

This file uses -- (double hyphen) while other files in this PR (e.g., use-generate-image.ts in React/Vue) use (em dash) for the same transport mode description pattern.

🔧 Suggested fix for consistency
-* - **ConnectConnectionAdapter** -- Streaming transport (SSE, HTTP stream, custom)
+* - **ConnectConnectionAdapter** — Streaming transport (SSE, HTTP stream, custom)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-svelte/src/create-generate-image.svelte.ts` at line
69, Update the JSDoc line in create-generate-image.svelte.ts to use an em dash
for consistency: replace the double hyphen in the description for
ConnectConnectionAdapter with an em dash (match the style used in
use-generate-image.ts and other files) so the line reads
"ConnectConnectionAdapter — Streaming transport (SSE, HTTP stream, custom)";
locate the JSDoc that references ConnectConnectionAdapter to make this
single-character change.
packages/typescript/ai/tests/stream-processor.test.ts (1)

2209-2244: Add one regression test for tool-call routing after pending ID reconciliation.

Current dedup tests validate text routing after startAssistantMessage() ID swap, but not TOOL_CALL_START (before swap) followed by TOOL_CALL_ARGS/END (after swap). Adding that case would lock in the map-rewrite behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai/tests/stream-processor.test.ts` around lines 2209 -
2244, Add a regression test in the same suite that mirrors the existing
TEXT_MESSAGE_START dedup case but for tool calls: instantiate StreamProcessor,
call startAssistantMessage() to create the pending manual tool-call message,
then process a TOOL_CALL_START chunk with a different messageId (e.g.,
'server-tool-1') and tool name, then send TOOL_CALL_ARGS and TOOL_CALL_END
chunks using the server messageId, finalize the stream, and assert there is only
one message whose id was updated to the server id and whose parts include the
tool-call args and correct tool metadata; use the StreamProcessor methods
startAssistantMessage, processChunk, getMessages, and finalizeStream and chunk
types TOOL_CALL_START, TOOL_CALL_ARGS, TOOL_CALL_END to locate where to add the
test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/typescript/ai-client/tests/chat-client.test.ts`:
- Around line 173-186: The test must assert that replacing the connection tears
down the old subscription: modify the test helper createSubscribeAdapter (used
to build adapter1/adapter2) so its subscribe returns a subscription with an
explicit abort/teardown side-effect (e.g., set a flag like wasAborted/wasClosed
or call a provided onFinally callback), then in this test call
client.subscribe(), replace the connection via client.updateOptions({
connection: adapter2 }) and assert adapter1's teardown flag was flipped (true)
in addition to asserting adapter2.subscribe was called; apply the same
change/assertion to the other replacement test block covering the second case.
- Around line 1991-2220: The tests record only shallow snapshots and assert only
final state; change the onMessagesChange handler in both tests (the one pushing
to messagesSnapshots) to deep-clone the messages (e.g., use structuredClone or
JSON.parse(JSON.stringify(msgs))) so intermediate emissions are preserved, then
add assertions over messagesSnapshots (inside the 'concurrent runs should not
produce duplicate messages or corrupt content' test ensure every snapshot has at
most one message per id and that msg-a and msg-b parts never show corrupted or
interleaved content; inside 'reconnect with initialMessages should not duplicate
assistant message on content arrival' ensure every snapshot length stays 2 and
the assistant message content only grows by appending "time..." without creating
a duplicate entry). Use the existing symbols messagesSnapshots,
onMessagesChange, client.subscribe, and the two it(...) test names to locate
where to modify and add these snapshot assertions.

In `@packages/typescript/ai-preact/src/use-chat.ts`:
- Around line 130-134: The live-mode cleanup only calls client.unsubscribe(),
which aborts work but doesn't emit the stopped() event; modify the cleanup in
useChat to call client.stop() before client.unsubscribe() when options.live is
true (i.e., change the branch handling the client in the cleanup to invoke
client.stop() then client.unsubscribe()) so the stopped event fires consistently
with the non-live path that calls client.stop().

In `@packages/typescript/ai-vue/src/use-chat.ts`:
- Around line 91-97: The dispose logic should always call client.unsubscribe()
to fully tear down any subscription loops: replace the conditional in
onScopeDispose that calls client.stop() when options.live is false so that
onScopeDispose always invokes client.unsubscribe() (retain client.stop() only if
there's additional teardown needed, but ensure client.unsubscribe() is executed
unconditionally) — update the onScopeDispose closure referencing options.live
and client so unsubscribe() is always called.

In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 646-663: When remapping a pending message ID (pendingId ->
messageId) in the block that updates this.messages, this.messageStates and
this.activeMessageIds, also reconcile the tool-call routing map
toolCallToMessage so existing tool calls continue to route: look up
toolCallToMessage.get(pendingId), and if present set
toolCallToMessage.set(messageId, thatValue) and delete the pendingId entry; do
this in the same if (pendingId !== messageId) branch so
TOOL_CALL_ARGS/TOOL_CALL_END still find the message by the new ID.

---

Outside diff comments:
In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 1117-1150: The approval-requested handler currently uses
getActiveAssistantMessageId(), which can update the wrong message in interleaved
conversations; instead resolve the owning message for the given toolCallId and
pass that message's id into updateToolCallApproval (e.g., call a helper like
getMessageIdForToolCall(toolCallId) or locate the message in this.messages by
matching the toolCallId on tool call parts), and only fall back to
getActiveAssistantMessageId() if no owner is found; then call
updateToolCallApproval(this.messages, owningMessageId, toolCallId, approval) so
the approval state is applied to the correct message.

---

Duplicate comments:
In `@packages/typescript/ai-client/tests/chat-client.test.ts`:
- Around line 293-297: The async generator mocks throw before yielding (e.g.,
the connection.subscribe async function* in the "should expose connectionStatus
error for subscription loop failures" test and the similar async generator at
1280-1282), which trips the require-yield/useYield lint rules; fix by replacing
those async function* mocks with an async iterator object that rejects from
next() (i.e., return { async next() { return Promise.reject(new Error('...')) },
async return() {...}, async throw() {...} } ) or simply add a dummy yield before
the throw so the generator yields once before throwing; update the mock for
connection.subscribe and the other async generator accordingly.

In `@packages/typescript/ai-solid/src/use-chat.ts`:
- Around line 102-107: Remove the eager subscribe/unsubscribe call in the hook
setup: delete the if (options.live) { client().subscribe() } else {
client().unsubscribe() } block so connection side-effects are only applied
inside the existing createEffect (which reads options.live and calls
client().subscribe()/unsubscribe()); this ensures client() connection logic runs
inside Solid's lifecycle rather than during hook initialization.

---

Nitpick comments:
In `@packages/typescript/ai-preact/src/types.ts`:
- Around line 30-40: Update the JSDoc for the UseChatOptions type to match the
current Omit list: include documentation entries for the newly omitted lifecycle
callbacks onSubscriptionChange, onConnectionStatusChange, and
onSessionGeneratingChange in addition to the existing onMessagesChange,
onLoadingChange, onErrorChange, and onStatusChange; locate the JSDoc immediately
above the UseChatOptions declaration and add brief descriptions for each
callback so the comment accurately reflects the type's omitted properties.

In `@packages/typescript/ai-react/src/types.ts`:
- Around line 6-12: The file imports ConnectionStatus but doesn't re-export it,
so add ConnectionStatus to the exported types to make it available to consumers;
update the export statement in types.ts (the export type line that currently
exports ChatRequestBody, MultimodalContent, UIMessage) to also include
ConnectionStatus so types like UseChatReturn can be fully typed by library
users.

In `@packages/typescript/ai-react/src/use-chat.ts`:
- Around line 115-122: The effect calling
client.subscribe()/client.unsubscribe() should avoid calling unsubscribe on
mount when the client is already disconnected; update the useEffect (the block
referencing useEffect, options.live, client.subscribe, client.unsubscribe) to
only call client.unsubscribe() when there was a previous active subscription
(e.g., track previous options.live with a ref like prevLiveRef or check a
client.isSubscribed flag) so that on initial mount with options.live
undefined/false you do nothing, but on transitions true→false you call
client.unsubscribe() and trigger setIsSubscribed/setConnectionStatus only when
state actually changes.

In `@packages/typescript/ai-svelte/src/create-generate-image.svelte.ts`:
- Line 69: Update the JSDoc line in create-generate-image.svelte.ts to use an em
dash for consistency: replace the double hyphen in the description for
ConnectConnectionAdapter with an em dash (match the style used in
use-generate-image.ts and other files) so the line reads
"ConnectConnectionAdapter — Streaming transport (SSE, HTTP stream, custom)";
locate the JSDoc that references ConnectConnectionAdapter to make this
single-character change.

In `@packages/typescript/ai/tests/stream-processor.test.ts`:
- Around line 2209-2244: Add a regression test in the same suite that mirrors
the existing TEXT_MESSAGE_START dedup case but for tool calls: instantiate
StreamProcessor, call startAssistantMessage() to create the pending manual
tool-call message, then process a TOOL_CALL_START chunk with a different
messageId (e.g., 'server-tool-1') and tool name, then send TOOL_CALL_ARGS and
TOOL_CALL_END chunks using the server messageId, finalize the stream, and assert
there is only one message whose id was updated to the server id and whose parts
include the tool-call args and correct tool metadata; use the StreamProcessor
methods startAssistantMessage, processChunk, getMessages, and finalizeStream and
chunk types TOOL_CALL_START, TOOL_CALL_ARGS, TOOL_CALL_END to locate where to
add the test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b0bc5be3-2787-43be-8768-7310eb75a659

📥 Commits

Reviewing files that changed from the base of the PR and between 1a0f3d7 and b8ae243.

📒 Files selected for processing (53)
  • packages/typescript/ai-client/src/chat-client.ts
  • packages/typescript/ai-client/src/generation-client.ts
  • packages/typescript/ai-client/src/generation-types.ts
  • packages/typescript/ai-client/src/index.ts
  • packages/typescript/ai-client/src/types.ts
  • packages/typescript/ai-client/src/video-generation-client.ts
  • packages/typescript/ai-client/tests/chat-client.test.ts
  • packages/typescript/ai-client/tests/connection-adapters.test.ts
  • packages/typescript/ai-client/tests/generation-client.test.ts
  • packages/typescript/ai-client/tests/video-generation-client.test.ts
  • packages/typescript/ai-preact/src/types.ts
  • packages/typescript/ai-preact/src/use-chat.ts
  • packages/typescript/ai-preact/tests/use-chat.test.ts
  • packages/typescript/ai-react/src/types.ts
  • packages/typescript/ai-react/src/use-chat.ts
  • packages/typescript/ai-react/src/use-generate-image.ts
  • packages/typescript/ai-react/src/use-generate-speech.ts
  • packages/typescript/ai-react/src/use-generate-video.ts
  • packages/typescript/ai-react/src/use-generation.ts
  • packages/typescript/ai-react/src/use-summarize.ts
  • packages/typescript/ai-react/src/use-transcription.ts
  • packages/typescript/ai-react/tests/use-chat.test.ts
  • packages/typescript/ai-solid/src/types.ts
  • packages/typescript/ai-solid/src/use-chat.ts
  • packages/typescript/ai-solid/src/use-generate-image.ts
  • packages/typescript/ai-solid/src/use-generate-speech.ts
  • packages/typescript/ai-solid/src/use-generate-video.ts
  • packages/typescript/ai-solid/src/use-generation.ts
  • packages/typescript/ai-solid/src/use-summarize.ts
  • packages/typescript/ai-solid/src/use-transcription.ts
  • packages/typescript/ai-solid/tests/test-utils.ts
  • packages/typescript/ai-solid/tests/use-chat.test.ts
  • packages/typescript/ai-svelte/src/create-chat.svelte.ts
  • packages/typescript/ai-svelte/src/create-generate-image.svelte.ts
  • packages/typescript/ai-svelte/src/create-generate-speech.svelte.ts
  • packages/typescript/ai-svelte/src/create-generate-video.svelte.ts
  • packages/typescript/ai-svelte/src/create-generation.svelte.ts
  • packages/typescript/ai-svelte/src/create-summarize.svelte.ts
  • packages/typescript/ai-svelte/src/create-transcription.svelte.ts
  • packages/typescript/ai-svelte/src/types.ts
  • packages/typescript/ai-svelte/tests/use-chat.test.ts
  • packages/typescript/ai-vue/src/types.ts
  • packages/typescript/ai-vue/src/use-chat.ts
  • packages/typescript/ai-vue/src/use-generate-image.ts
  • packages/typescript/ai-vue/src/use-generate-speech.ts
  • packages/typescript/ai-vue/src/use-generate-video.ts
  • packages/typescript/ai-vue/src/use-generation.ts
  • packages/typescript/ai-vue/src/use-summarize.ts
  • packages/typescript/ai-vue/src/use-transcription.ts
  • packages/typescript/ai-vue/tests/test-utils.ts
  • packages/typescript/ai-vue/tests/use-chat.test.ts
  • packages/typescript/ai/src/activities/chat/stream/processor.ts
  • packages/typescript/ai/tests/stream-processor.test.ts
🚧 Files skipped from review as they are similar to previous changes (11)
  • packages/typescript/ai-solid/tests/test-utils.ts
  • packages/typescript/ai-vue/tests/use-chat.test.ts
  • packages/typescript/ai-vue/src/types.ts
  • packages/typescript/ai-vue/tests/test-utils.ts
  • packages/typescript/ai-react/tests/use-chat.test.ts
  • packages/typescript/ai-svelte/src/create-chat.svelte.ts
  • packages/typescript/ai-preact/tests/use-chat.test.ts
  • packages/typescript/ai-svelte/src/types.ts
  • packages/typescript/ai-solid/tests/use-chat.test.ts
  • packages/typescript/ai-client/src/types.ts
  • packages/typescript/ai-client/src/index.ts

Comment on lines +173 to +186
it('should re-subscribe on connection update when previously subscribed', () => {
const adapter1 = createSubscribeAdapter([])
const adapter2 = createSubscribeAdapter([])
const client = new ChatClient({ connection: adapter1 })

client.subscribe()
expect(client.getIsSubscribed()).toBe(true)
expect(adapter1.subscribe).toHaveBeenCalledTimes(1)

client.updateOptions({ connection: adapter2 })

expect(client.getIsSubscribed()).toBe(true)
expect(adapter2.subscribe).toHaveBeenCalledTimes(1)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Assert that adapter replacement actually tears down the old subscription.

Both replacement tests prove the new connection is used, but neither proves adapter1/connection1 was aborted. A regression where updateOptions({ connection }) starts the replacement loop without closing the old one would still pass here and can reintroduce duplicate-stream behavior. Please expose an abort/finally flag on the first connection and assert it flips during replacement.

Also applies to: 494-535

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/tests/chat-client.test.ts` around lines 173 -
186, The test must assert that replacing the connection tears down the old
subscription: modify the test helper createSubscribeAdapter (used to build
adapter1/adapter2) so its subscribe returns a subscription with an explicit
abort/teardown side-effect (e.g., set a flag like wasAborted/wasClosed or call a
provided onFinally callback), then in this test call client.subscribe(), replace
the connection via client.updateOptions({ connection: adapter2 }) and assert
adapter1's teardown flag was flipped (true) in addition to asserting
adapter2.subscribe was called; apply the same change/assertion to the other
replacement test block covering the second case.

Comment on lines +1991 to +2220
it('concurrent runs should not produce duplicate messages or corrupt content', async () => {
const wake = { fn: null as (() => void) | null }
const chunks: Array<StreamChunk> = []
const connection = {
subscribe: async function* (signal?: AbortSignal) {
while (!signal?.aborted) {
if (chunks.length > 0) {
const batch = chunks.splice(0)
for (const chunk of batch) {
yield chunk
}
}
await new Promise<void>((resolve) => {
wake.fn = resolve
const onAbort = () => resolve()
signal?.addEventListener('abort', onAbort, { once: true })
})
}
},
send: async () => {
wake.fn?.()
},
}

const messagesSnapshots: Array<Array<UIMessage>> = []
const client = new ChatClient({
connection,
onMessagesChange: (msgs) => {
messagesSnapshots.push(msgs.map((m) => ({ ...m })))
},
})

client.subscribe()
await new Promise((resolve) => setTimeout(resolve, 10))

// Run A starts with text message
chunks.push(
{
type: 'RUN_STARTED',
runId: 'run-a',
model: 'test',
timestamp: Date.now(),
},
{
type: 'TEXT_MESSAGE_START',
messageId: 'msg-a',
role: 'assistant',
model: 'test',
timestamp: Date.now(),
} as StreamChunk,
{
type: 'TEXT_MESSAGE_CONTENT',
messageId: 'msg-a',
model: 'test',
timestamp: Date.now(),
delta: 'Story: ',
} as StreamChunk,
)
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

// Run B starts concurrently
chunks.push(
{
type: 'RUN_STARTED',
runId: 'run-b',
model: 'test',
timestamp: Date.now(),
},
{
type: 'TEXT_MESSAGE_START',
messageId: 'msg-b',
role: 'assistant',
model: 'test',
timestamp: Date.now(),
} as StreamChunk,
{
type: 'TEXT_MESSAGE_CONTENT',
messageId: 'msg-b',
model: 'test',
timestamp: Date.now(),
delta: 'Hi!',
} as StreamChunk,
)
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

// Run B finishes — Run A should still be active
chunks.push({
type: 'RUN_FINISHED',
runId: 'run-b',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
})
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

// Run A continues streaming
chunks.push({
type: 'TEXT_MESSAGE_CONTENT',
messageId: 'msg-a',
model: 'test',
timestamp: Date.now(),
delta: 'once upon a time',
} as StreamChunk)
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

// Verify msg-a still has correct content after run-b finished
const messages = client.getMessages()
const msgA = messages.find((m) => m.id === 'msg-a')
const msgB = messages.find((m) => m.id === 'msg-b')

expect(msgA).toBeDefined()
expect(msgB).toBeDefined()
expect(msgA!.parts[0]).toEqual({
type: 'text',
content: 'Story: once upon a time',
})
expect(msgB!.parts[0]).toEqual({ type: 'text', content: 'Hi!' })

// No duplicate messages
expect(messages.filter((m) => m.id === 'msg-a')).toHaveLength(1)
expect(messages.filter((m) => m.id === 'msg-b')).toHaveLength(1)

// Finish run A
chunks.push({
type: 'RUN_FINISHED',
runId: 'run-a',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
})
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

expect(client.getSessionGenerating()).toBe(false)
client.unsubscribe()
})

it('reconnect with initialMessages should not duplicate assistant message on content arrival', async () => {
const wake = { fn: null as (() => void) | null }
const chunks: Array<StreamChunk> = []
const connection = {
subscribe: async function* (signal?: AbortSignal) {
while (!signal?.aborted) {
if (chunks.length > 0) {
const batch = chunks.splice(0)
for (const chunk of batch) {
yield chunk
}
}
await new Promise<void>((resolve) => {
wake.fn = resolve
const onAbort = () => resolve()
signal?.addEventListener('abort', onAbort, { once: true })
})
}
},
send: async () => {
wake.fn?.()
},
}

// Simulate reconnect: client created with initialMessages (from SSR/snapshot)
const initialMessages: Array<UIMessage> = [
{
id: 'user-1',
role: 'user',
parts: [{ type: 'text', content: 'Tell me a story' }],
createdAt: new Date(),
},
{
id: 'asst-1',
role: 'assistant',
parts: [{ type: 'text', content: 'Once upon a ' }],
createdAt: new Date(),
},
]

const client = new ChatClient({
connection,
initialMessages,
})

client.subscribe()
await new Promise((resolve) => setTimeout(resolve, 10))

// Resumed content for in-progress message (no TEXT_MESSAGE_START)
chunks.push(
{
type: 'RUN_STARTED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
},
{
type: 'TEXT_MESSAGE_CONTENT',
messageId: 'asst-1',
model: 'test',
timestamp: Date.now(),
delta: 'time...',
} as StreamChunk,
{
type: 'RUN_FINISHED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
},
)
wake.fn?.()
await new Promise((resolve) => setTimeout(resolve, 20))

const messages = client.getMessages()

// Should still have exactly 2 messages, not 3
expect(messages).toHaveLength(2)

// Content should be correctly appended
const asstMsg = messages.find((m) => m.id === 'asst-1')
expect(asstMsg).toBeDefined()
expect(asstMsg!.parts[0]).toEqual({
type: 'text',
content: 'Once upon a time...',
})

client.unsubscribe()
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These regression tests still miss transient corruption.

The concurrent-run test records messagesSnapshots, but never asserts them, and { ...m } only shallow-copies each message shell. The reconnect test does not capture intermediate emissions at all. If a duplicate assistant message or bad text splice appears for one render and then self-heals before the final assertion, both tests still pass even though the UI flickered. Please deep-clone intermediate emissions and assert the invariants on every snapshot, not just the terminal state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/tests/chat-client.test.ts` around lines 1991 -
2220, The tests record only shallow snapshots and assert only final state;
change the onMessagesChange handler in both tests (the one pushing to
messagesSnapshots) to deep-clone the messages (e.g., use structuredClone or
JSON.parse(JSON.stringify(msgs))) so intermediate emissions are preserved, then
add assertions over messagesSnapshots (inside the 'concurrent runs should not
produce duplicate messages or corrupt content' test ensure every snapshot has at
most one message per id and that msg-a and msg-b parts never show corrupted or
interleaved content; inside 'reconnect with initialMessages should not duplicate
assistant message on content arrival' ensure every snapshot length stays 2 and
the assistant message content only grows by appending "time..." without creating
a duplicate entry). Use the existing symbols messagesSnapshots,
onMessagesChange, client.subscribe, and the two it(...) test names to locate
where to modify and add these snapshot assertions.

Comment on lines +130 to +134
if (options.live) {
client.unsubscribe()
} else {
client.stop()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

❓ Verification inconclusive

Script executed:

#!/bin/bash
set -euo pipefail

# Verify ChatClient stop/unsubscribe semantics in repo source.
fd 'chat-client\.ts$' -t f | while read -r f; do
  echo "=== $f ==="
  rg -n -C4 '\b(stop|unsubscribe)\s*\(' "$f"
done

# Inspect this hook's cleanup and live effect behavior.
rg -n -C4 'options\.live|client\.subscribe\(|client\.unsubscribe\(|client\.stop\(' packages/typescript/ai-preact/src/use-chat.ts

Repository: TanStack/ai


Repository: TanStack/ai
Exit code: 0

stdout:

=== packages/typescript/ai-client/src/chat-client.ts ===
693-  /**
694-   * Unsubscribe and fully tear down live behavior.
695-   * This aborts an in-flight request and the subscription loop.
696-   */
697:  unsubscribe(): void {
698-    this.cancelInFlightStream({
699-      setReadyStatus: true,
700-      abortSubscription: true,
701-    })
--
734-
735-  /**
736-   * Stop the current stream
737-   */
738:  stop(): void {
739-    this.cancelInFlightStream({ setReadyStatus: true })
740-    this.events.stopped()
741-  }
742-
113-    }
114-  }, [])
115-
116-  useEffect(() => {
117:    if (options.live) {
118:      client.subscribe()
119-    } else {
120:      client.unsubscribe()
121-    }
122:  }, [client, options.live])
123-
124-  // Cleanup on unmount: stop any in-flight requests
125-  // Note: We only cleanup when client changes or component unmounts.
126-  // DO NOT include isLoading in dependencies - that would cause the cleanup
127-  // to run when isLoading changes, aborting continuation requests.
128-  useEffect(() => {
129-    return () => {
130:      if (options.live) {
131:        client.unsubscribe()
132-      } else {
133:        client.stop()
134-      }
135-    }
136:  }, [client, options.live])
137-
138-  // Note: Callback options (onResponse, onChunk, onFinish, onError, onToolCall)
139-  // are captured at client creation time. Changes to these callbacks require
140-  // remounting the component or changing the connection to recreate the client.
--
156-    await client.reload()
157-  }, [client])
158-
159-  const stop = useCallback(() => {
160:    client.stop()
161-  }, [client])
162-
163-  const clear = useCallback(() => {
164-    client.clear()

Call stop() during live-mode cleanup to ensure stopped event fires.

On Line 130–134, live-mode cleanup calls only unsubscribe(), which tears down the subscription and aborts in-flight work but does not emit the stopped() event. Non-live cleanup calls stop(), which aborts in-flight work and emits the event. For consistent cleanup notification, live mode should also call stop() before unsubscribe().

🔧 Proposed fix
   return () => {
-    if (options.live) {
-      client.unsubscribe()
-    } else {
-      client.stop()
-    }
+    client.stop()
+    if (options.live) {
+      client.unsubscribe()
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (options.live) {
client.unsubscribe()
} else {
client.stop()
}
return () => {
client.stop()
if (options.live) {
client.unsubscribe()
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-preact/src/use-chat.ts` around lines 130 - 134, The
live-mode cleanup only calls client.unsubscribe(), which aborts work but doesn't
emit the stopped() event; modify the cleanup in useChat to call client.stop()
before client.unsubscribe() when options.live is true (i.e., change the branch
handling the client in the cleanup to invoke client.stop() then
client.unsubscribe()) so the stopped event fires consistently with the non-live
path that calls client.stop().

Comment on lines 91 to 97
onScopeDispose(() => {
client.stop()
if (options.live) {
client.unsubscribe()
} else {
client.stop()
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Dispose should always call unsubscribe() to fully tear down connections.

At Line 95, using client.stop() for non-live mode can leave the subscription loop running after unmount (since requests still call subscribe() internally). This risks dangling background processing.

💡 Proposed fix
   onScopeDispose(() => {
-    if (options.live) {
-      client.unsubscribe()
-    } else {
-      client.stop()
-    }
+    client.unsubscribe()
   })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-vue/src/use-chat.ts` around lines 91 - 97, The dispose
logic should always call client.unsubscribe() to fully tear down any
subscription loops: replace the conditional in onScopeDispose that calls
client.stop() when options.live is false so that onScopeDispose always invokes
client.unsubscribe() (retain client.stop() only if there's additional teardown
needed, but ensure client.unsubscribe() is executed unconditionally) — update
the onScopeDispose closure referencing options.live and client so unsubscribe()
is always called.

Comment on lines +646 to +663
if (pendingId !== messageId) {
// Update the message's ID in the messages array
this.messages = this.messages.map((msg) =>
msg.id === pendingId ? { ...msg, id: messageId } : msg,
)

// Move state to the new key
const existingState = this.messageStates.get(pendingId)
if (existingState) {
existingState.id = messageId
this.messageStates.delete(pendingId)
this.messageStates.set(messageId, existingState)
}

// Update activeMessageIds
this.activeMessageIds.delete(pendingId)
this.activeMessageIds.add(messageId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reconcile toolCallToMessage when pending message ID is remapped.

At Line 646, you remap message/state IDs, but existing tool-call routing entries still point to pendingId. That can cause TOOL_CALL_ARGS/TOOL_CALL_END to no-op for already-started tool calls.

💡 Proposed fix
       if (pendingId !== messageId) {
         // Update the message's ID in the messages array
         this.messages = this.messages.map((msg) =>
           msg.id === pendingId ? { ...msg, id: messageId } : msg,
         )
@@
         // Update activeMessageIds
         this.activeMessageIds.delete(pendingId)
         this.activeMessageIds.add(messageId)
+
+        // Re-point tool-call routing entries created before ID reconciliation
+        for (const [toolCallId, mappedMessageId] of this.toolCallToMessage) {
+          if (mappedMessageId === pendingId) {
+            this.toolCallToMessage.set(toolCallId, messageId)
+          }
+        }
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (pendingId !== messageId) {
// Update the message's ID in the messages array
this.messages = this.messages.map((msg) =>
msg.id === pendingId ? { ...msg, id: messageId } : msg,
)
// Move state to the new key
const existingState = this.messageStates.get(pendingId)
if (existingState) {
existingState.id = messageId
this.messageStates.delete(pendingId)
this.messageStates.set(messageId, existingState)
}
// Update activeMessageIds
this.activeMessageIds.delete(pendingId)
this.activeMessageIds.add(messageId)
}
if (pendingId !== messageId) {
// Update the message's ID in the messages array
this.messages = this.messages.map((msg) =>
msg.id === pendingId ? { ...msg, id: messageId } : msg,
)
// Move state to the new key
const existingState = this.messageStates.get(pendingId)
if (existingState) {
existingState.id = messageId
this.messageStates.delete(pendingId)
this.messageStates.set(messageId, existingState)
}
// Update activeMessageIds
this.activeMessageIds.delete(pendingId)
this.activeMessageIds.add(messageId)
// Re-point tool-call routing entries created before ID reconciliation
for (const [toolCallId, mappedMessageId] of this.toolCallToMessage) {
if (mappedMessageId === pendingId) {
this.toolCallToMessage.set(toolCallId, messageId)
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai/src/activities/chat/stream/processor.ts` around lines
646 - 663, When remapping a pending message ID (pendingId -> messageId) in the
block that updates this.messages, this.messageStates and this.activeMessageIds,
also reconcile the tool-call routing map toolCallToMessage so existing tool
calls continue to route: look up toolCallToMessage.get(pendingId), and if
present set toolCallToMessage.set(messageId, thatValue) and delete the pendingId
entry; do this in the same if (pendingId !== messageId) branch so
TOOL_CALL_ARGS/TOOL_CALL_END still find the message by the new ID.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
packages/typescript/ai-svelte/src/create-chat.svelte.ts (1)

98-100: ⚠️ Potential issue | 🟠 Major

Expose unsubscribe() for live subscription teardown.

When live: true, client.subscribe() starts a durable subscription, but the returned interface only provides stop(). Per chat-client.ts, stop() cancels request-local work while unsubscribe() tears down the live subscription. Svelte consumers have no way to release the connection on component destroy.

🔧 Proposed fix

In create-chat.svelte.ts:

  const stop = () => {
    client.stop()
  }

+ const unsubscribe = () => {
+   client.unsubscribe()
+ }

  // In return object:
  return {
    // ... existing properties
    stop,
+   unsubscribe,
    // ...
  }

In types.ts, add to CreateChatReturn:

  stop: () => void
+ /**
+  * Unsubscribe from the live subscription loop.
+  * Call this on component destroy when using `live: true`.
+  */
+ unsubscribe: () => void

Also applies to: 102-105, 177-186

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-svelte/src/create-chat.svelte.ts` around lines 98 -
100, When options.live is true, client.subscribe() starts a durable live
subscription but the returned object only exposes stop(), so callers cannot
fully tear down the live connection; update create-chat.svelte.ts to capture the
subscription result from client.subscribe() and expose its unsubscribe() method
alongside stop(), and update the CreateChatReturn type in types.ts to include
unsubscribe(): void (or nullable) so Svelte components can call unsubscribe() on
destroy; ensure you also add unsubscribe to the other similar branches mentioned
(lines ~102-105 and ~177-186) where client.subscribe() is used.
🧹 Nitpick comments (2)
packages/typescript/ai/src/activities/chat/stream/processor.ts (1)

1376-1391: Content/thinking aggregation across messages may yield unexpected results.

getResult() concatenates totalTextContent and thinkingContent from all message states. For multi-message sessions, this produces a single aggregated string without message boundaries. Consider if this is the intended behavior, or if consumers expect per-message results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai/src/activities/chat/stream/processor.ts` around lines
1376 - 1391, getResult() currently flattens all messageStates by concatenating
each state's totalTextContent and thinkingContent into single strings, which
loses per-message boundaries; change getResult() (and its return shape) to
preserve per-message results by returning a map or array keyed by message id
(e.g., messageStates.forEach -> build perMessageContents: Record<messageId,
{content, thinking}>) and keep the existing aggregated content only if strictly
needed (or add it as an optional field) so callers can access both per-message
and aggregated data; update any consumers of getResult() to read
perMessageContents (or handle the added optional aggregated field) and reference
messageStates, totalTextContent, and thinkingContent when making the change.
packages/typescript/ai/tests/stream-processor.test.ts (1)

2470-2508: Test uses raw chunk without runId - verify this is intentional for backward compat.

This test sends RUN_FINISHED without runId, which tests backward compatibility for streams that don't include run tracking. The test description could clarify this is testing backward compat behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai/tests/stream-processor.test.ts` around lines 2470 -
2508, Update the test description for the "double onStreamEnd guard" case to
explicitly state it's intentionally sending a RUN_FINISHED chunk without a runId
to verify backward-compatibility for streams that don't include run tracking;
locate the test using StreamProcessor and processChunk (the it block that sends
RUN_FINISHED then TEXT_MESSAGE_END) and change the it string to mention
"backward-compat: RUN_FINISHED without runId" so future maintainers understand
this is intentional.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.changeset/tidy-zebras-drum.md:
- Around line 2-7: The changeset is missing an entry for the package that had
public API changes; add '@tanstack/ai-preact': patch to the changeset so the
modified hook in packages/typescript/ai-preact/src/use-chat.ts (the useChat hook
and its returned state) gets published; update the list in
.changeset/tidy-zebras-drum.md to include that package name with the same
release level (patch) as the other packages.

In `@packages/typescript/ai-client/src/chat-client.ts`:
- Around line 310-324: The current cancelInFlightStream() aborts the request but
streamResponse() unconditionally drains postStreamActions in its finally block,
allowing queued continuations (from
append/addToolResult/addToolApprovalResponse) to resurrect a canceled session;
fix by making cancelInFlightStream() set a clear cancellation marker (e.g.,
this.streamCanceled = true) and call
abortSubscriptionLoop()/resolveProcessing()/setStatus as now, then update
streamResponse() and checkForContinuation() to check that
abortController?.signal.aborted or this.streamCanceled before running any
postStreamActions or scheduling continuations and skip draining/continuing when
canceled; also ensure queued postStreamActions are cleared or ignored after
cancel and reset the marker only when a new stream is legitimately started
(e.g., in the method that creates a fresh abortController).
- Around line 394-402: The code is resolving request-level completion
(resolveProcessing) for any terminal chunk on a shared subscription; fix by
correlating terminal chunk runIds to the specific send() request before calling
resolveProcessing: when send() starts, record the request's runId(s) (e.g., a
per-request set like requestRunIds stored on the ChatClient instance or returned
by the server) and then in the RUN_FINISHED / RUN_ERROR branch only delete from
this.activeRunIds and call this.resolveProcessing() if chunk.runId is present
and matches one of the runIds for the current request (or, for session-level
RUN_ERROR with no runId, only resolve if this request created no runId and is
the originator), otherwise skip calling resolveProcessing; use existing symbols
activeRunIds, setSessionGenerating, resolveProcessing, chunk.runId and send() to
locate where to add the request→run correlation check.

In `@packages/typescript/ai-preact/src/use-chat.ts`:
- Around line 128-136: The cleanup in the useEffect currently only calls
client.unsubscribe() for live mode and client.stop() otherwise, which leaves the
subscription open when sendMessage() started a durable subscription; change the
unmount teardown to always call client.unsubscribe() to close the subscription
stream (regardless of options.live) and then conditionally call client.stop()
when not live to abort the in-flight request; locate useEffect and update the
cleanup to first invoke client.unsubscribe(), then if (!options.live) call
client.stop(); references: useEffect, options.live, client.unsubscribe(),
client.stop(), sendMessage(), streamResponse(), subscribe().

In `@packages/typescript/ai-solid/src/use-chat.ts`:
- Around line 117-124: The cleanup currently only calls client().stop() when
options.live is false, which aborts the request but leaves the durable
subscription started by streamResponse()/ensureSubscription() running; modify
the onCleanup block (and/or streamResponse/ensureSubscription if needed) to also
clear that subscription: when unmounting and options.live is false call
client().unsubscribe() (or invoke the unsubscribe/teardown returned by
ensureSubscription/sendMessage) in addition to client().stop() so the background
durable subscription loop is cancelled.

---

Duplicate comments:
In `@packages/typescript/ai-svelte/src/create-chat.svelte.ts`:
- Around line 98-100: When options.live is true, client.subscribe() starts a
durable live subscription but the returned object only exposes stop(), so
callers cannot fully tear down the live connection; update create-chat.svelte.ts
to capture the subscription result from client.subscribe() and expose its
unsubscribe() method alongside stop(), and update the CreateChatReturn type in
types.ts to include unsubscribe(): void (or nullable) so Svelte components can
call unsubscribe() on destroy; ensure you also add unsubscribe to the other
similar branches mentioned (lines ~102-105 and ~177-186) where
client.subscribe() is used.

---

Nitpick comments:
In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 1376-1391: getResult() currently flattens all messageStates by
concatenating each state's totalTextContent and thinkingContent into single
strings, which loses per-message boundaries; change getResult() (and its return
shape) to preserve per-message results by returning a map or array keyed by
message id (e.g., messageStates.forEach -> build perMessageContents:
Record<messageId, {content, thinking}>) and keep the existing aggregated content
only if strictly needed (or add it as an optional field) so callers can access
both per-message and aggregated data; update any consumers of getResult() to
read perMessageContents (or handle the added optional aggregated field) and
reference messageStates, totalTextContent, and thinkingContent when making the
change.

In `@packages/typescript/ai/tests/stream-processor.test.ts`:
- Around line 2470-2508: Update the test description for the "double onStreamEnd
guard" case to explicitly state it's intentionally sending a RUN_FINISHED chunk
without a runId to verify backward-compatibility for streams that don't include
run tracking; locate the test using StreamProcessor and processChunk (the it
block that sends RUN_FINISHED then TEXT_MESSAGE_END) and change the it string to
mention "backward-compat: RUN_FINISHED without runId" so future maintainers
understand this is intentional.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9d48fcea-9c08-43c3-b67b-8d8c789d1906

📥 Commits

Reviewing files that changed from the base of the PR and between b8ae243 and 16f983c.

📒 Files selected for processing (25)
  • .changeset/fresh-donkeys-breathe.md
  • .changeset/tidy-zebras-drum.md
  • packages/typescript/ai-client/src/chat-client.ts
  • packages/typescript/ai-client/src/index.ts
  • packages/typescript/ai-client/src/types.ts
  • packages/typescript/ai-client/tests/chat-client.test.ts
  • packages/typescript/ai-preact/src/types.ts
  • packages/typescript/ai-preact/src/use-chat.ts
  • packages/typescript/ai-preact/tests/use-chat.test.ts
  • packages/typescript/ai-react/src/types.ts
  • packages/typescript/ai-react/src/use-chat.ts
  • packages/typescript/ai-react/tests/use-chat.test.ts
  • packages/typescript/ai-solid/src/types.ts
  • packages/typescript/ai-solid/src/use-chat.ts
  • packages/typescript/ai-solid/tests/test-utils.ts
  • packages/typescript/ai-solid/tests/use-chat.test.ts
  • packages/typescript/ai-svelte/src/create-chat.svelte.ts
  • packages/typescript/ai-svelte/src/types.ts
  • packages/typescript/ai-svelte/tests/use-chat.test.ts
  • packages/typescript/ai-vue/src/types.ts
  • packages/typescript/ai-vue/src/use-chat.ts
  • packages/typescript/ai-vue/tests/test-utils.ts
  • packages/typescript/ai-vue/tests/use-chat.test.ts
  • packages/typescript/ai/src/activities/chat/stream/processor.ts
  • packages/typescript/ai/tests/stream-processor.test.ts
🚧 Files skipped from review as they are similar to previous changes (8)
  • packages/typescript/ai-client/src/index.ts
  • packages/typescript/ai-solid/src/types.ts
  • packages/typescript/ai-react/src/use-chat.ts
  • packages/typescript/ai-preact/src/types.ts
  • packages/typescript/ai-preact/tests/use-chat.test.ts
  • packages/typescript/ai-vue/tests/use-chat.test.ts
  • packages/typescript/ai-react/src/types.ts
  • packages/typescript/ai-solid/tests/use-chat.test.ts

Comment on lines +2 to +7
'@tanstack/ai': patch
'@tanstack/ai-client': patch
'@tanstack/ai-react': patch
'@tanstack/ai-solid': patch
'@tanstack/ai-svelte': patch
'@tanstack/ai-vue': patch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add @tanstack/ai-preact to this changeset.

packages/typescript/ai-preact/src/use-chat.ts changes the hook's public behavior and return state, but @tanstack/ai-preact is missing from the release plan here. Without an entry, those changes will not publish.

Suggested update
 '@tanstack/ai': patch
 '@tanstack/ai-client': patch
+'@tanstack/ai-preact': patch
 '@tanstack/ai-react': patch
 '@tanstack/ai-solid': patch
 '@tanstack/ai-svelte': patch
 '@tanstack/ai-vue': patch
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
'@tanstack/ai': patch
'@tanstack/ai-client': patch
'@tanstack/ai-react': patch
'@tanstack/ai-solid': patch
'@tanstack/ai-svelte': patch
'@tanstack/ai-vue': patch
'@tanstack/ai': patch
'@tanstack/ai-client': patch
'@tanstack/ai-preact': patch
'@tanstack/ai-react': patch
'@tanstack/ai-solid': patch
'@tanstack/ai-svelte': patch
'@tanstack/ai-vue': patch
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.changeset/tidy-zebras-drum.md around lines 2 - 7, The changeset is missing
an entry for the package that had public API changes; add '@tanstack/ai-preact':
patch to the changeset so the modified hook in
packages/typescript/ai-preact/src/use-chat.ts (the useChat hook and its returned
state) gets published; update the list in .changeset/tidy-zebras-drum.md to
include that package name with the same release level (patch) as the other
packages.

Comment on lines +310 to +324
private cancelInFlightStream(options?: {
setReadyStatus?: boolean
abortSubscription?: boolean
}): void {
this.abortController?.abort()
this.abortController = null
if (options?.abortSubscription) {
this.abortSubscriptionLoop()
}
this.resolveProcessing()
this.setIsLoading(false)
if (options?.setReadyStatus) {
this.setStatus('ready')
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Canceled streams still drain queued continuations.

cancelInFlightStream() aborts the active request, but the aborted streamResponse() still unconditionally drains postStreamActions in finally. If append(), addToolResult(), or addToolApprovalResponse() queued work before stop(), unsubscribe(), or a connection swap, that work can immediately call streamResponse() / checkForContinuation() and resurrect the session you just canceled.

Possible fix
-        // Drain any actions that were queued while the stream was in progress
-        await this.drainPostStreamActions()
+        if (streamCompletedSuccessfully) {
+          await this.drainPostStreamActions()
+        } else {
+          this.postStreamActions = []
+          this.continuationSkipped = false
+        }

Also applies to: 652-657

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/src/chat-client.ts` around lines 310 - 324, The
current cancelInFlightStream() aborts the request but streamResponse()
unconditionally drains postStreamActions in its finally block, allowing queued
continuations (from append/addToolResult/addToolApprovalResponse) to resurrect a
canceled session; fix by making cancelInFlightStream() set a clear cancellation
marker (e.g., this.streamCanceled = true) and call
abortSubscriptionLoop()/resolveProcessing()/setStatus as now, then update
streamResponse() and checkForContinuation() to check that
abortController?.signal.aborted or this.streamCanceled before running any
postStreamActions or scheduling continuations and skip draining/continuing when
canceled; also ensure queued postStreamActions are cleared or ignored after
cancel and reset the marker only when a new stream is legitimately started
(e.g., in the method that creates a fresh abortController).

Comment on lines +394 to +402
if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
if (chunk.runId) {
this.activeRunIds.delete(chunk.runId)
} else if (chunk.type === 'RUN_ERROR') {
// RUN_ERROR without runId is a session-level error; clear all runs
this.activeRunIds.clear()
}
this.setSessionGenerating(this.activeRunIds.size > 0)
this.resolveProcessing()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't resolve request completion from unrelated runs.

processingComplete is request-local, but this branch resolves it for every terminal chunk seen on the shared subscription. If another run in the same durable session finishes before the run started by this send(), await sendMessage() can return early and the completion path will run against partial local output. This needs request→run correlation before calling resolveProcessing().

Also applies to: 599-610

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/src/chat-client.ts` around lines 394 - 402, The
code is resolving request-level completion (resolveProcessing) for any terminal
chunk on a shared subscription; fix by correlating terminal chunk runIds to the
specific send() request before calling resolveProcessing: when send() starts,
record the request's runId(s) (e.g., a per-request set like requestRunIds stored
on the ChatClient instance or returned by the server) and then in the
RUN_FINISHED / RUN_ERROR branch only delete from this.activeRunIds and call
this.resolveProcessing() if chunk.runId is present and matches one of the runIds
for the current request (or, for session-level RUN_ERROR with no runId, only
resolve if this request created no runId and is the originator), otherwise skip
calling resolveProcessing; use existing symbols activeRunIds,
setSessionGenerating, resolveProcessing, chunk.runId and send() to locate where
to add the request→run correlation check.

Comment on lines 128 to +136
useEffect(() => {
return () => {
client.stop()
if (options.live) {
client.unsubscribe()
} else {
client.stop()
}
}
}, [client])
}, [client, options.live])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Always tear down the subscription stream on unmount.

sendMessage() auto-starts a durable subscription via streamResponse()subscribe() regardless of options.live. The non-live cleanup path calls stop(), which aborts only the request but leaves the subscription stream open after unmount, causing a resource leak.

Suggested cleanup
   useEffect(() => {
     return () => {
-      if (options.live) {
-        client.unsubscribe()
-      } else {
-        client.stop()
-      }
+      client.unsubscribe()
     }
   }, [client, options.live])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
useEffect(() => {
return () => {
client.stop()
if (options.live) {
client.unsubscribe()
} else {
client.stop()
}
}
}, [client])
}, [client, options.live])
useEffect(() => {
return () => {
client.unsubscribe()
}
}, [client, options.live])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-preact/src/use-chat.ts` around lines 128 - 136, The
cleanup in the useEffect currently only calls client.unsubscribe() for live mode
and client.stop() otherwise, which leaves the subscription open when
sendMessage() started a durable subscription; change the unmount teardown to
always call client.unsubscribe() to close the subscription stream (regardless of
options.live) and then conditionally call client.stop() when not live to abort
the in-flight request; locate useEffect and update the cleanup to first invoke
client.unsubscribe(), then if (!options.live) call client.stop(); references:
useEffect, options.live, client.unsubscribe(), client.stop(), sendMessage(),
streamResponse(), subscribe().

Comment on lines +117 to 124
// Cleanup on unmount: stop any in-flight requests.
onCleanup(() => {
if (options.live) {
client().unsubscribe()
} else {
client().stop()
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "use-chat.ts" -type f | head -5

Repository: TanStack/ai

Length of output: 241


🏁 Script executed:

find . -name "*.ts" -type f | xargs grep -l "class ChatClient" | head -5

Repository: TanStack/ai

Length of output: 151


🏁 Script executed:

fd -t f "use-chat.ts" -x cat -n {}

Repository: TanStack/ai

Length of output: 28054


🏁 Script executed:

cat -n ./packages/typescript/ai-client/src/chat-client.ts | head -200

Repository: TanStack/ai

Length of output: 8906


🏁 Script executed:

wc -l ./packages/typescript/ai-client/src/chat-client.ts

Repository: TanStack/ai

Length of output: 110


🏁 Script executed:

cat -n ./packages/typescript/ai-client/src/chat-client.ts

Repository: TanStack/ai

Length of output: 39258


Non-live cleanup still leaks auto-started subscriptions.

client().stop() only aborts the in-flight request. streamResponse() auto-starts the background subscription via ensureSubscription() on every sendMessage(), so unmounting with live === false leaves the durable subscription loop running with no owner.

Suggested cleanup
   onCleanup(() => {
-    if (options.live) {
-      client().unsubscribe()
-    } else {
-      client().stop()
-    }
+    client().unsubscribe()
   })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Cleanup on unmount: stop any in-flight requests.
onCleanup(() => {
if (options.live) {
client().unsubscribe()
} else {
client().stop()
}
})
// Cleanup on unmount: stop any in-flight requests.
onCleanup(() => {
client().unsubscribe()
})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-solid/src/use-chat.ts` around lines 117 - 124, The
cleanup currently only calls client().stop() when options.live is false, which
aborts the request but leaves the durable subscription started by
streamResponse()/ensureSubscription() running; modify the onCleanup block
(and/or streamResponse/ensureSubscription if needed) to also clear that
subscription: when unmounting and options.live is false call
client().unsubscribe() (or invoke the unsubscribe/teardown returned by
ensureSubscription/sendMessage) in addition to client().stop() so the background
durable subscription loop is cancelled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants